d04d3ceaf9a678cd962690b8d45df5b96b150106,bridge/src/main/java/io/ppatierno/kafka/bridge/SinkBridgeEndpoint.java,KafkaConsumerRunner,run,#,211

Before Change


				    	DeliveryOptions options = new DeliveryOptions();
						options.addHeader(SinkBridgeEndpoint.EVENT_BUS_HEADER_COMMAND, SinkBridgeEndpoint.EVENT_BUS_SEND_COMMAND);
						
				    	this.vertx.eventBus().send(this.ebQueue, "", options);
				    }
				}
			} catch (WakeupException e) {

After Change


				    	DeliveryOptions options = new DeliveryOptions();
						options.addHeader(SinkBridgeEndpoint.EVENT_BUS_HEADER_COMMAND, SinkBridgeEndpoint.EVENT_BUS_SEND_COMMAND);
						
						if (this.qos == ProtonQoS.AT_MOST_ONCE) {
							
							// Sender QoS settled (AT_MOST_ONCE) : commit immediately and start message sending
							try {
								
								// 1. immediate commit 
								this.consumer.commitSync();
								
								// 2. commit ok, so we can enqueue record for sending
								for (ConsumerRecord<String, byte[]> record : records)  {
							        
							    	LOG.info("Received from Kafka partition {} [{}], key = {}, value = {}", record.partition(), record.offset(), record.key(), new String(record.value()));
							    	this.queue.add(record);				    	
							    }
								
								// 3. start message sending
								this.vertx.eventBus().send(this.ebQueue, "", options);
								
							} catch (Exception e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
							}
							
							
						} else {
							
							// Sender QoS unsettled (AT_LEAST_ONCE) : start message sending, wait end and commit
							
							// 1. enqueue record for sending
							for (ConsumerRecord<String, byte[]> record : records)  {
						        
						    	LOG.info("Received from Kafka partition {} [{}], key = {}, value = {}", record.partition(), record.offset(), record.key(), new String(record.value()));
						    	this.queue.add(record);				    	
						    }
							
							// 2. start message sending
							this.vertx.eventBus().send(this.ebQueue, "", options, ar -> {
								
								// 4. commit
								this.consumer.commitSync();
							});
							
							// TODO : consider timeout ??
							try {